// Copyright 2020 The Cockroach Authors.
// Copyright (c) 2022-present, Shanghai Yunxi Technology Co, Ltd. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This software (KWDB) is licensed under Mulan PSL v2.
// You can use this software according to the terms and conditions of the Mulan PSL v2.
// You may obtain a copy of Mulan PSL v2 at:
//          http://license.coscl.org.cn/MulanPSL2
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PSL v2 for more details.

package colexec

import (
	"context"

	"gitee.com/kwbasedb/kwbase/pkg/col/coldata"
	"gitee.com/kwbasedb/kwbase/pkg/col/coltypes"
)

// initHash, rehash, and finalizeHash work together to compute the hash value
// for an individual key tuple which represents a row's equality columns. Since
// this key is a tuple of various types, rehash is used to apply a
// transformation on the resulting hash value based on an element of the key of
// a specified type.
//
// The combination of these three functions actually defines a hashing function
// family - changing the initial hash value will produce a "different" hash
// function.
//
// We currently use the same hash functions used by go's maps.
// TODO(asubiotto): Once https://go-review.googlesource.com/c/go/+/155118/ is
// in, we should use the public API.

// defaultInitHashValue is the default initValue to be used in initHash
// function.
const defaultInitHashValue = 1

var (
	uint64OneColumn []uint64
	uint64TwoColumn []uint64
)

func init() {
	uint64OneColumn = make([]uint64, coldata.MaxBatchSize)
	uint64TwoColumn = make([]uint64, coldata.MaxBatchSize)
	for i := range uint64OneColumn {
		uint64OneColumn[i] = 1
		uint64TwoColumn[i] = 2
	}
}

// rehash takes an element of a key (tuple representing a row of equality
// column values) at a given column and computes a new hash by applying a
// transformation to the existing hash. This function is generated by execgen,
// so it doesn't appear in this file. Look at hash_utils_tmpl.go for the source
// code.
//
// initHash initializes the hash value of each key to its initial state for
// rehashing purposes.
// NOTE: initValue *must* be non-zero.
func initHash(buckets []uint64, nKeys int, initValue uint64) {
	switch initValue {
	case 1:
		for n := 0; n < nKeys; n += copy(buckets[n:], uint64OneColumn) {
		}
	case 2:
		for n := 0; n < nKeys; n += copy(buckets[n:], uint64TwoColumn) {
		}
	default:
		for i := 0; i < nKeys; i++ {
			buckets[i] = initValue
		}
	}
}

// finalizeHash takes each key's hash value and applies a final transformation
// onto it so that it fits within numBuckets buckets.
func finalizeHash(buckets []uint64, nKeys int, numBuckets uint64) {
	isPowerOfTwo := numBuckets&(numBuckets-1) == 0
	if isPowerOfTwo {
		for i := 0; i < nKeys; i++ {
			// Since numBuckets is a power of 2, modulo numBuckets could be optimized
			// into a bitwise operation which improves benchmark performance by 20%.
			// In effect, the following code is equivalent to (but faster than):
			// buckets[i] = buckets[i] % numBuckets
			buckets[i] = buckets[i] & (numBuckets - 1)
		}
	} else {
		for i := 0; i < nKeys; i++ {
			buckets[i] = buckets[i] % numBuckets
		}
	}
}

// tupleHashDistributor is a helper struct that distributes tuples from batches
// according to the corresponding hashes. The "distribution" occurs by
// populating selection vectors which the caller needs to use accordingly.
type tupleHashDistributor struct {
	// initHashValue is the value used to initialize the hash buckets. Different
	// values can be used to define different hash functions.
	initHashValue uint64
	// buckets will contain the computed hash value of a group of columns with
	// the same index in the current batch.
	buckets []uint64
	// selections stores the selection vectors that actually define how to
	// distribute the tuples from the batch.
	selections [][]int
	// cancelChecker is used during the hashing of the rows to distribute to
	// check for query cancellation.
	cancelChecker  CancelChecker
	decimalScratch decimalOverloadScratch
}

func newTupleHashDistributor(initHashValue uint64, numOutputs int) *tupleHashDistributor {
	selections := make([][]int, numOutputs)
	for i := range selections {
		selections[i] = make([]int, 0, coldata.BatchSize())
	}
	return &tupleHashDistributor{
		initHashValue: initHashValue,
		buckets:       make([]uint64, coldata.BatchSize()),
		selections:    selections,
	}
}

func (d *tupleHashDistributor) distribute(
	ctx context.Context, b coldata.Batch, types []coltypes.T, hashCols []uint32,
) [][]int {
	n := b.Length()
	initHash(d.buckets, n, d.initHashValue)

	for _, i := range hashCols {
		rehash(ctx, d.buckets, types[i], b.ColVec(int(i)), n, b.Selection(), d.cancelChecker, d.decimalScratch)
	}

	finalizeHash(d.buckets, n, uint64(len(d.selections)))

	// Reset selections.
	for i := 0; i < len(d.selections); i++ {
		d.selections[i] = d.selections[i][:0]
	}

	// Build a selection vector for each output.
	selection := b.Selection()
	if selection != nil {
		for i, selIdx := range selection[:n] {
			outputIdx := d.buckets[i]
			d.selections[outputIdx] = append(d.selections[outputIdx], selIdx)
		}
	} else {
		for i := range d.buckets[:n] {
			outputIdx := d.buckets[i]
			d.selections[outputIdx] = append(d.selections[outputIdx], i)
		}
	}
	return d.selections
}

// resetNumOutputs sets up the tupleHashDistributor to distribute the tuples
// to a different number of outputs.
func (d *tupleHashDistributor) resetNumOutputs(numOutputs int) {
	if cap(d.selections) >= numOutputs {
		d.selections = d.selections[:numOutputs]
		return
	}
	d.selections = d.selections[:cap(d.selections)]
	for len(d.selections) < numOutputs {
		d.selections = append(d.selections, make([]int, 0, coldata.BatchSize()))
	}
}
